In [1]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

3.1. Spark DataFrames & Pandas Plotting - Python

Create Dataproc Cluster with Jupyter

This notebook is designed to be run on Google Cloud Dataproc.

Follow the links below for instructions on how to create a Dataproc Cluster with the Juypter component installed.

Python 3 Kernel

Use a Python 3 kernel (not PySpark) to allow you to configure the SparkSession in the notebook and include the spark-bigquery-connector required to use the BigQuery Storage API.

Scala Version

Check what version of Scala you are running so you can include the correct spark-bigquery-connector jar


In [2]:
!scala -version


cat: /release: No such file or directory
Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL

Create Spark Session

Include the correct version of the spark-bigquery-connector jar

Scala version 2.11 - 'gs://spark-lib/bigquery/spark-bigquery-latest.jar'.

Scala version 2.12 - 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar'.


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
  .appName('Spark DataFrames & Pandas Plotting')\
  .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar') \
  .getOrCreate()

Enable repl.eagerEval

This will output the results of DataFrames in each step without the new need to show df.show() and also improves the formatting of the output


In [4]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

Read BigQuery table into Spark DataFrame

Use filter() to query data from a partitioned table.


In [5]:
table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()


root
 |-- datehour: timestamp (nullable = true)
 |-- wiki: string (nullable = true)
 |-- title: string (nullable = true)
 |-- views: long (nullable = true)

Select required columns and apply a filter using where() which is an alias for filter() then cache the table


In [6]:
df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \
  .cache()

df_wiki_en


Out[6]:
datehourwikiviews
2020-03-01 23:00:00en3242
2020-03-01 22:00:00en2368
2020-03-01 22:00:00en2360
2020-03-01 22:00:00en2223
2020-03-01 22:00:00en1398
2020-03-01 23:00:00en1872
2020-03-01 23:00:00en136620
2020-03-01 23:00:00en1084
2020-03-01 23:00:00en1946
2020-03-01 23:00:00en8313
2020-03-01 23:00:00en1084
2020-03-01 22:00:00en3524
2020-03-01 22:00:00en1328
2020-03-01 22:00:00en1297
2020-03-01 23:00:00en1968
2020-03-01 22:00:00en1139
2020-03-01 22:00:00en1006
2020-03-01 23:00:00en1511
2020-03-01 23:00:00en1526
2020-03-01 22:00:00en1405
only showing top 20 rows

Group by title and order by page views to see the top pages


In [7]:
import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)


Out[7]:
datehourtotal_views
2020-03-01 21:00:001642981
2020-03-01 06:00:001591160
2020-03-01 22:00:001541455
2020-03-01 17:00:001535983
2020-03-01 18:00:001495387
2020-03-01 16:00:001487786
2020-03-01 05:00:001469068
2020-03-01 07:00:001458756
2020-03-01 20:00:001457051
2020-03-01 15:00:001446984
2020-03-01 19:00:001427811
2020-03-01 14:00:001372760
2020-03-01 23:00:001353548
2020-03-01 08:00:001353292
2020-03-01 03:00:001339853
2020-03-01 04:00:001312186
2020-03-01 12:00:001225647
2020-03-01 13:00:001212003
2020-03-01 10:00:001211310
2020-03-01 09:00:001200977
only showing top 20 rows

Convert Spark DataFrame to Pandas DataFrame

Convert the Spark DataFrame to Pandas DataFrame and set the datehour as the index


In [8]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
%time pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()


CPU times: user 15.8 ms, sys: 4 ms, total: 19.8 ms
Wall time: 3.05 s
Out[8]:
total_views
datehour
2020-03-01 22:00:00 1541455
2020-03-01 09:00:00 1200977
2020-03-01 12:00:00 1225647
2020-03-01 20:00:00 1457051
2020-03-01 10:00:00 1211310

Plotting Pandas Dataframe

Import matplotlib


In [9]:
import matplotlib.pyplot as plt

Use the Pandas plot function to create a line chart


In [10]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));


Plot Multiple Columns

Create a new Spark DataFrame and pivot the wiki column to create multiple rows for each wiki value


In [11]:
import pyspark.sql.functions as F

df_wiki_totals = df_wiki_en \
.groupBy("datehour") \
.pivot("wiki") \
.agg(F.sum('views').alias('total_views'))

df_wiki_totals


Out[11]:
datehourenen.m
2020-03-01 22:00:00558358983097
2020-03-01 09:00:00638692562285
2020-03-01 12:00:00633432592215
2020-03-01 20:00:00615714841337
2020-03-01 05:00:00588808880260
2020-03-01 10:00:00644680566630
2020-03-01 14:00:00685500687260
2020-03-01 19:00:00592967834844
2020-03-01 03:00:00391300948553
2020-03-01 01:00:00360511783510
2020-03-01 04:00:00383489928697
2020-03-01 18:00:00645590849797
2020-03-01 00:00:00382154758920
2020-03-01 07:00:00839531619225
2020-03-01 08:00:00783419569873
2020-03-01 13:00:00619111592892
2020-03-01 11:00:00594027577016
2020-03-01 15:00:00695881751103
2020-03-01 16:00:00661878825908
2020-03-01 23:00:00484077869471
only showing top 20 rows

Convert to Pandas DataFrame


In [12]:
pandas_wiki_totals = df_wiki_totals.toPandas()

pandas_wiki_totals.set_index('datehour', inplace=True)
pandas_wiki_totals.head()


Out[12]:
en en.m
datehour
2020-03-01 22:00:00 558358 983097
2020-03-01 09:00:00 638692 562285
2020-03-01 12:00:00 633432 592215
2020-03-01 20:00:00 615714 841337
2020-03-01 10:00:00 644680 566630

Create plot with line for each column


In [13]:
pandas_wiki_totals.plot(kind='line',figsize=(12,6))


Out[13]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fb386d9d7f0>

Create stacked area plot


In [14]:
pandas_wiki_totals.plot.area(figsize=(12,6))


Out[14]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fb386f712b0>